package com.cygsunri.event.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.cygsunri.event.entity.EventInfo;
import com.cygsunri.event.websocket.service.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
 * 事项推送服务
 */
@Slf4j
@Component
public class EventPushService extends Thread {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private EventService eventService;

    @Value("${task.event.alarm_key}")
    private String alarm_key;

    @Value("${task.event.batch}")
    private int batch;

    /**
     * 事项推送线程
     */
    @Override
    public void run() {
        while (true) {
            try {
                long a = System.currentTimeMillis();
                List<String> list = redisTemplate.opsForList().range(alarm_key, 0, batch - 1);
                if (list != null && list.size() != 0) {
                    redisTemplate.opsForList().trim(alarm_key, batch, -1);
                    analyseEvent(list);
                    log.info("处理{}条事项，耗时{}ms", list.size(), System.currentTimeMillis() - a);
                } else {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 分析事项，推送到前台
     */
    private void analyseEvent(List<String> list) {
        List<EventInfo> eventInfoList = new ArrayList<>();
        list.forEach(l -> {
            String[] msg = l.split("/");
            Long time = Long.parseLong(msg[0]);
            String code = msg[1];
            Double data = Double.parseDouble(msg[2]);
            Integer type = Integer.parseInt(msg[3]);

            EventInfo cacheInfo = eventService.getYxInfoMapping(code);
            if (cacheInfo == null) return;

            String uuid = UUID.randomUUID().toString().replace("-", "");
            //缓存取出的实体必须重新声明
            EventInfo eventInfo = new EventInfo()
                    .setId(uuid)
                    .setKey(uuid)
                    .setMeasurementID(cacheInfo.getMeasurementID())
                    .setMeasurementDesc(cacheInfo.getMeasurementDesc())
                    .setPsrId(cacheInfo.getPsrId())
                    .setPsrDesc(cacheInfo.getPsrDesc())
                    .setSubstationID(cacheInfo.getSubstationID())
                    .setSubstationDesc(cacheInfo.getSubstationDesc())
                    .setTimeStamp(time)
                    .setTxtInfo(data == 1 ? "动作" : "复归")
                    .setType(type)
                    .setIsConfirm(false);
            eventInfoList.add(eventInfo);
        });
        WebSocketServer.broadcast(JSON.toJSONString(eventInfoList, SerializerFeature.DisableCircularReferenceDetect));
        log.info("推送告警事项{}", JSON.toJSONString(eventInfoList, SerializerFeature.DisableCircularReferenceDetect));
        eventService.insertEventList(eventInfoList);
    }
}
